-
Notifications
You must be signed in to change notification settings - Fork 1.4k
compressable: Handle zstd frames correctly #5094
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This change is needed because, without this change, we just received unknown zstd frames like: forward: len=12984, head=54 d1 00 da cc 74 2e 4e 20 d6 2a ce 01 be f2 c2 | compressed(opt)=2 But, zstd specification always needs to attach the head of magic bytes like: forward: len=19835, head=28 b5 2f fd a0 c2 14 03 00 a4 d0 00 4a f7 30 39 | compressed(opt)=2 So, we need to attach the head of magic bytes `28 b5 2f fd` in zstd compressed payloads. This can be dumped with: ```diff diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 4c323bb..f192d6e3 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -672,6 +672,9 @@ module Fluent::Plugin sock.write @sender.forward_header # array, size=3 sock.write tag.to_msgpack # 1. tag: String (str) chunk.open(compressed: @compress) do |chunk_io| + head = chunk_io.read(8) || ''.b + @log.info "debug: forward entries head", hex: head.bytes.map { |b| "%02x" % b }.join(' ') + chunk_io.rewind entries = [0xdb, chunk_io.size].pack('CN') sock.write entries.force_encoding(Encoding::UTF_8) # 2. entries: String (str32) IO.copy_stream(chunk_io, sock) # writeRawBody(packed_es) ``` Signed-off-by: Hiroshi Hatake <[email protected]>
@cosmo0920 When I try a simple test with $ irb -rzstd-ruby -rstringio
irb(main):001> io = StringIO.new
=> #<StringIO:0x00007d7e3bab3c18>
irb(main):002> stream = Zstd::StreamWriter.new(io)
=> #<Zstd::StreamWriter:0x00007d7e3632d6b0 @io=#<StringIO:0x00007d7e3bab3c18>, @stream=#<Zstd::StreamingCompress:0x00007d7e3632a258>>
irb(main):003> stream.write("abc")
=> 12
irb(main):004> stream.finish
=> 3
irb(main):005> io.rewind
=> 0
irb(main):006> d=io.read
=> "(\xB5/\xFD\u0000X\u0018\u0000\u0000abc\u0001\u0000\u0000"
irb(main):008> d.unpack("H*")
=> ["28b52ffd0058180000616263010000"] I’ll also try to check this with Fluentd’s behavior. |
How about using Enumerable mixined class instances case? |
Yup, at least, out_forward does not attach such zstd standarized magic number when using zstd compression. |
From RFC 8878:
Yes, Fluentd generates without this type of magic number when compressing and using zstd compression. Plus, it's little endian flag so the number of series that is From https://datatracker.ietf.org/doc/html/rfc6713, we need to use this magic number at the payloads' headers:
|
Sorry, I didn’t have much time today.
<source>
@type sample
tag test.foo
</source>
<match test.**>
@type forward
compress zstd
<server>
host localhost
port 24224
</server>
<buffer>
@type memory
flush_mode interval
flush_interval 2s
</buffer>
</match>
<source>
@type forward
@label @SERVER
</source>
<label @SERVER>
<match **>
@type stdout
</match>
</label>
diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb
index 4c323bb0..977d99f6 100644
--- a/lib/fluent/plugin/out_forward.rb
+++ b/lib/fluent/plugin/out_forward.rb
@@ -672,6 +672,9 @@ module Fluent::Plugin
sock.write @sender.forward_header # array, size=3
sock.write tag.to_msgpack # 1. tag: String (str)
chunk.open(compressed: @compress) do |chunk_io|
+ head = chunk_io.read(8) || ''.b
+ @log.warn "debug: forward entries head", hex: head.bytes.map { |b| "%02x" % b }.join(' ')
+ chunk_io.rewind
entries = [0xdb, chunk_io.size].pack('CN')
sock.write entries.force_encoding(Encoding::UTF_8) # 2. entries: String (str32)
IO.copy_stream(chunk_io, sock) # writeRawBody(packed_es)
|
I’ll check more patterns, including forwarding with Fluent Bit. |
Hi, I rechecked and found that -- when nothing to occur for not concatenated cases of zstd compression. This could be reproduced with huge amount of lines file and head - /path/to/tailing_target and using in_tail plugin to ingest large amount of events and will be able to handle this type of high volume specific occurrences. To reproduce this issue, it needs an ingestion of around the amount of 1700 lines of file contents at once. |
How's going this PR, mate? |
Sorry, I haven’t been able to make time over the past few days. |
I could reproduce this! Thanks! <source>
@type sample
tag test.foo
size 5000 # This is important.
</source>
<match test.**>
@type forward
compress zstd
<server>
host localhost
port 24224
</server>
<buffer>
@type memory
flush_mode interval
flush_interval 2s
</buffer>
</match>
<source>
@type forward
@label @SERVER
</source>
<label @SERVER>
<match **>
@type stdout
</match>
</label>
|
Sorry for the delay. As already reported here, it appears that |
It's a really good point. |
Thanks for making the patch for the upstream. I agree. |
Although I haven’t thoroughly tested it and there are likely other parts that will require fixes as well, I think this approach would be a good direction to take for now. diff --git a/lib/fluent/plugin/compressable.rb b/lib/fluent/plugin/compressable.rb
index 2ec21229..a9344171 100644
--- a/lib/fluent/plugin/compressable.rb
+++ b/lib/fluent/plugin/compressable.rb
@@ -26,13 +26,13 @@ module Fluent
io = output_io || StringIO.new
if type == :gzip
writer = Zlib::GzipWriter.new(io)
+ writer.write(data)
+ writer.finish
elsif type == :zstd
- writer = Zstd::StreamWriter.new(io)
+ io << Zstd.compress(data)
else
raise ArgumentError, "Unknown compression type: #{type}"
end
- writer.write(data)
- writer.finish
output_io || io.string
end It appears we don't need to use the |
Does this patch work for Enumerable type argument of data? |
Hi, Does this PR resolve this kind of issues without this patch? |
Does Note: Lines 65 to 68 in 14c0a0b
Enumerable is used here ( fluentd/lib/fluent/plugin/buffer/chunk.rb Lines 251 to 263 in 14c0a0b
|
Thanks so much! After this issue itself is resolved, it might still be worth considering the point that we may not need to use the Streaming feature of zstd-ruby. |
I have confirmed this issue is resolved with zstd-ruby 1.5.7.1! |
It may be better to address this point in a separate PR. Perhaps we can close this PR now. |
Now, we need to take benckmark w/ streaming zstd compression or w/ normal zstd compression. |
Just closing this PR and updating zstd dependency with minimum required version would be one of the ideal solutions for mitigating this. |
Ah, there might have been a misunderstanding. If the logic fundamentally requires the streaming feature, it wouldn’t be desirable to give it up just because the feature is still experimental. |
Got it. The minimum required logic here is just terminating/creating for each zstd frame with the correct data structures and heading magic bytes in zstd compression with reasonable CPU or memory usages. |
Agree.
Since I don’t have a deep understanding of the library’s implementation, this is only a guess, but I expected single compression to perform better in both CPU and memory usage, as streaming compression seems likely to be more complex. This is only my speculation, and you know the implementation of the library better than I. |
Thanks for your effort!
|
This change is needed because, without this change, we just received unknown zstd frames like:
forward: len=12984, head=54 d1 00 da cc 74 2e 4e 20 d6 2a ce 01 be f2 c2 | compressed(opt)=2
But, zstd specification always needs to attach the head of magic bytes like:
forward: len=19835, head=28 b5 2f fd a0 c2 14 03 00 a4 d0 00 4a f7 30 39 | compressed(opt)=2
So, we need to attach the head of magic bytes
28 b5 2f fd
in zstd compressed payloads.This can be dumped with:
Which issue(s) this PR fixes:
None
What this PR does / why we need it:
This could be a known issue after merging #4657.
This is because with that PR patch, we wasn't able to decompress zstd compressed insisted payloads in Fluent Bit side.
In our side, we need to set up explicit zstd frames with the head of magic bytes:
28 b5 2f fd
.However, stream writer of zstd-ruby does not wrap up their compressing payloads with that zstd specific payloads.
So, we always experienced this kind of errors by using Fluent Bit's development version of in_forward with zstd compressed insisted payloads.
With gzip compressed payloads, there is no issue but the behavior differences of StreamWrite class between Gzip and Zstd could cause this issue.
The related Fluent Bit's PR is:
fluent/fluent-bit#10710
Docs Changes:
Release Note: